package com.guaoran.distributed.message.kafka.group;

import com.guaoran.distributed.message.kafka.common.CommonUtil;
import org.apache.kafka.clients.producer.*;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * @author : guaoran
 * @Description : <br/>
 *  group.id
 *      consumer group是kafka提供的可扩展且具有容错性的消费者机制。
 *      既然是一个组，那么组内必然可以有多个消费者或消费者实例(consumer instance)，它们共享一个公共的ID，即group ID。
 *      组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。
 *      当然，每个分区只能由同一个消费组内的一个consumer来消费.
 *      如下图所示，分别有三个消费者，属于两个不同的group，那么对于firstTopic这个topic来说，这两个组的消费者都能同时消费这个topic中的消息，对
 *      于此事的架构来说，这个firstTopic就类似于ActiveMQ中的topic概念。
 *      如右图所示，如果3个消费者都属于同一个group，那么此事firstTopic就是一个Queue的概念
 *      //todo 分组：即当一个 topic 同时有两个 client-id时，代表他们是同一个分组，如果client-id不同，则代表是不同的分组。
 *      //todo 当 同一个topic 同时有个多个 client-id时，同时启动去消费生成者的消息会有以下两种情况，
 *          //todo 1.当分区只有一个的时候，两个消费者只有一个能消费到数据， 另一个消费不到数据，因为是在同一个分组下，存在竞争关系
 *          //todo 2.当有多个分区时，建议分区时消费者个倍数，这样的话，会保证每个消费者同时能消费到数据，这样可以减少消费者的压力。
 * @date :2018/11/9 16:00
 */
public class KafkaProducerGroupDemo extends Thread {
    private final static String CONNECT_URL = CommonUtil.BOOTSTRAP_SERVERS_CONFIG;
    private final KafkaProducer<Integer,String> producer;
    private final String topic;
    public KafkaProducerGroupDemo(String topic){
        this.topic = topic;
        Properties properties = new Properties();
        //连接地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,CONNECT_URL);
        properties.put(ProducerConfig.CLIENT_ID_CONFIG,"kafka-group");
        properties.put(ProducerConfig.ACKS_CONFIG,"-1");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.IntegerSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");

        producer = new KafkaProducer<Integer, String>(properties);


    }

    @Override
    public void run() {
        int num = 0;
        while(num<500){
            String message = "message_"+num;
            System.out.println("begin...send..."+message);
            try {
                RecordMetadata recordMetadata = producer.send(new ProducerRecord<Integer, String>(topic,message)).get();
//                System.out.println("sync-offset:"+recordMetadata.offset()+
//                        "->partition:"+recordMetadata.partition());
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            num++;
        }
    }

    public static void main(String[] args) {
        new KafkaProducerGroupDemo("group-test").start();
    }
}
